package ES6.demo;

import com.alibaba.fastjson.JSON;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.util.List;
import java.util.Map;


/**
 * Created by dong on 2018/4/23.
 */
public class IndexUtils {

    /**
     * 为需要搜索的文章、应用等添加索引，根据传入的参数进行索引的添加
     * @param index 创建到哪个索引中
     * @param type 类型
     * @param id 唯一标识
     * @param json 需要创建索引的内容
     * @return
     */
    public static int addIndex(String index, String type, String id, String json){
        Map jsonMap = (Map) JSON.parse(json);
        return addIndex(index, type, id, jsonMap);
    }

    /**
     * 为需要搜索的文章、应用等添加索引，根据传入的参数进行索引的添加
     * @param index 创建到哪个索引中
     * @param type 类型
     * @param id 唯一标识
     * @param json 需要创建索引的内容
     */
    public static int addIndex(String index, String type, String id, Map<String, Object> json){
        TransportClient client = ESUtils.getClient();

        //设置Mapping，不然analyzer不会生效
//        PutMappingRequest mappingRequest = Requests.putMappingRequest(index).type(type).source(ESUtils.getMapping2());
//        client.admin().indices().putMapping(mappingRequest).actionGet();

        //IndexResponse indexResponse = client.getConnection().prepareIndex("twitter", "tweet").setSource(JSONObject.toJSON(jsonMap), XContentType.JSON).get();
        IndexResponse indexResponse = client.prepareIndex(index, type,id).setSource(json).get();
        // Index name
        String _index = indexResponse.getIndex();
        // Type name
        String _type = indexResponse.getType();
        // Document ID (generated or not)
        String _id = indexResponse.getId();
        // Version (if it's the first time you index this document, you will get: 1)
        long _version = indexResponse.getVersion();
        // status has stored current instance statement.
        RestStatus status = indexResponse.status();
        System.out.println(_index + "_" + _type + "_" + _id + "_" + _version + "_" + status);

        return indexResponse.status().getStatus();
    }

    /**
     * 为需要搜索的文章、应用等添加索引，根据传入的参数进行索引的批量添加
     * @param index 创建到哪个索引中
     * @param type 类型
     * @param id 唯一标识
     * @param jsons 需要创建索引的内容列表
     * @return
     */
    public static int bulkIndex(String index, String type, String id, List<String> jsons) throws IOException {

        TransportClient client = ESUtils.getClient();
//        //设置Mapping，不然analyzer不会生效
//        PutMappingRequest mappingRequest = Requests.putMappingRequest(index).type(type).source(ESUtils.getMapping2());
//        client.admin().indices().putMapping(mappingRequest).actionGet();

        //client.getConnection().prepareIndex("temp1","test").
        BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                //调用bulk之前执行 ，例如你可以通过request.numberOfActions()方法知道numberOfActions
                System.out.println("executionId = [" + executionId + "], request = [" + request + "]");
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                //调用bulk之后执行 ，例如你可以通过request.hasFailures()方法知道是否执行失败
                System.out.println("executionId = [" + executionId + "], request = [" + request + "], response = [" + response + "]");
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                //调用失败抛 Throwable
                System.out.println("executionId = [" + executionId + "], request = [" + request + "], failure = [" + failure + "]");
            }
        })
        .setBulkActions(10)
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
        .setConcurrentRequests(1)
        .setFlushInterval(TimeValue.timeValueMillis(1))
        .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();

        for (String json : jsons) {
            bulkProcessor.add(new IndexRequest(index, type, id).source(json));
        }

        bulkProcessor.flush();
        bulkProcessor.close();

        return 1;
    }


    /**
     * 更新特定id的索引
     * @param index 创建到哪个索引中
     * @param type 类型
     * @param id 唯一标识
     * @param json 需要创建索引的内容
     * @return
     */
    public static int updateIndex(String index, String type, String id, String json){
        TransportClient client = ESUtils.getClient();
        IndexResponse response = client.prepareIndex(index, type, id)
                .setSource(json, XContentType.JSON)
                .get();
        return response.status().getStatus();
    }

    /**
     * 删除索引
     * @param index
     * @param type
     * @param id
     * @return
     */
    public static int deleteIndex(String index, String type, String id){
        TransportClient client = ESUtils.getClient();
        DeleteResponse response = client.prepareDelete(index, type, id).execute().actionGet();

        return response.status().getStatus();
    }
}
